import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {


//    整个UDTAF最终返回的是一个自定义的对象(包含数据在里面)



//    下面的都是Flink的UDTAF要求实现得函数
    @Override
    public Top2Accumulator createAccumulator() {
        Top2Accumulator acc = new Top2Accumulator();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }//新建一个对象然后初始化，这个对象是用来收集(accumulate)数据的
//    聚合对象的初始化
//    创建一个空的accumulator



//    accumulate函数可以重载
//    该函数用来保存当前的聚合结果(官方文档)
    public void accumulate(Top2Accumulator acc, Integer value) {
        if (value > acc.first) {
            acc.second = acc.first;
            acc.first = value;
        } else if (value > acc.second) {
            acc.second = value;
        }
    }




//该函数的作用是把多个accumulator的结果整合为一个acc
//    Iterable的意思是,即将被整合的一大堆的accumulator
    public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it)
    {
        for (Top2Accumulator otherAcc : it) //迭代器中的每个元素是otherAcc
        {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
        // emit the value and rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}